-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[draft] [fix] [client] fix ack failed when consumer is reconnecting #21928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[draft] [fix] [client] fix ack failed when consumer is reconnecting #21928
Conversation
// batch without ackSet. | ||
{CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0)}, | ||
{CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0)}, | ||
// batch with ackSe. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// batch with ackSe. | |
// batch with ackSet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
public void testImmediateAckWhenReconnecting(CommandAck.AckType ackType, MessageId messageId) throws Exception { | ||
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); | ||
final String subscriptionName = "s1"; | ||
PulsarClient delayConnectClient = createDelayReconnectClient(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client should be closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -871,6 +871,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) { | |||
if (!(firstTimeConnect && hasParentConsumer) && getCurrentReceiverQueueSize() != 0) { | |||
increaseAvailablePermits(cnx, getCurrentReceiverQueueSize()); | |||
} | |||
acknowledgmentsGroupingTracker.afterConsumerReconnected(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not call flush()
method directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
if (!queueDueToConnecting | ||
&& (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it break the behavior of
// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.
If the consumer is reconnecting but the ack has properties. We will also group the acks which is not expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui Yes. But this behavior is only added for the TwoPhaseCompactor
to avoid latency when seeking, the client side has no way to attach properties. see
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
Lines 229 to 230 in 55520bd
.thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId, | |
Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId()))) |
Currently, if acknowledge
is called during reconnection, phaseTwoSeekThenLoop
will fail.
From my perspective, we should also queue the ACK requests and flush them after connected. @poorbarcode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agress with @BewareMyPower
I am trying to split this PR into the following.
- part-1: add a new component
AcknowledgmentCache
to support caching the acknowledgments which include the argproperties.
- part-2: split
PersistentAcknowledgmentsGroupingTracker
into two implementations:- cache and batch the acks
- immediately ack
- part-3: fix the issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marked current PR as Draft
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split PersistentAcknowledgmentsGroupingTracker into two implementations
It's reasonable. BTW, the C++ client is already implemented in such way that:
- AckGroupingEnabled: cache the acknowledgments and flush them due to the timeout or count
- AckGrouingDisabled: acknowledge immediately
Though the naming is not good (but consistent with other impl classes in the library)
private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, Map<String, Long> properties) { | ||
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { | ||
private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, Map<String, Long> properties, | ||
boolean queueDueToConnecting) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method invocations like f(args..., true)
and f(args..., false)
are hard to read unless you jump to the implementation of f
and see what the boolean parameter means.
You should add a new method like queueIndividualAck
and call it directly. For example,
private CompletableFuture<Void> queueIndividualAck(MessageIdAdv messageId) {
Optional<Lock> readLock = acquireReadLock();
try {
doIndividualAckAsync(messageId);
return readLock.map(__ -> currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null));
} finally {
readLock.ifPresent(Lock::unlock);
if (pendingIndividualAcks.size() >= maxAckGroupSize) {
flush();
}
}
}
private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.
return doImmediateAck(messageId, AckType.Individual, properties, null);
} else {
return queueIndividualAck(messageId);
}
}
Then you don't have to add false
argument to all existing doIndividualAck
calls. And in doImmediateAck
, you can call queueIndividualAck
directly.
private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId, AckType ackType, Map<String, Long> properties,
BitSetRecyclable bitSet) {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null && consumer.getState() == HandlerState.State.Connecting) {
if (ackType == AckType.Cumulative) {
return queueCumulativeAck(msgId);
} else {
return queueIndividualAck(msgId);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will change the implementation of this PR as #21928 (comment)
Please rebase |
@poorbarcode Please rebase |
Motivation
The
ack
command will fail when the consumer is reconnecting when settingacknowledgmentGroupTime ->0
. see https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L361-L364Modifications
Call
ack
after the consumer is connected.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x